[core] Make CancelTask RPC Fault Tolerant#58018
Conversation
Signed-off-by: joshlee <joshlee@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request makes the CancelTask RPC fault-tolerant by introducing an intermediary CancelLocalTask RPC to the raylet. This ensures that when a task is cancelled with force=True, the worker process is guaranteed to be killed, even if the graceful shutdown fails. The changes touch both normal task and actor task submission paths, and include a new Python test to verify the fault tolerance and idempotency of the cancellation logic.
My review identifies a critical bug in the new HandleCancelLocalTask implementation where a reply callback could be invoked twice, potentially crashing the raylet. I've also pointed out a minor issue with a misleading log message. Overall, the approach is sound, but the race condition needs to be fixed.
Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
|
@dayshah PTAL |
src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc
Outdated
Show resolved
Hide resolved
src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc
Outdated
Show resolved
Hide resolved
Signed-off-by: joshlee <joshlee@anyscale.com>
| std::chrono::high_resolution_clock::now().time_since_epoch()) { | ||
| cancel_retry_timer_.expires_after(boost::asio::chrono::milliseconds( | ||
| RayConfig::instance().cancellation_retry_ms())); | ||
| auto do_cancel_local_task = |
There was a problem hiding this comment.
I don't like this formatting change by the pre commit hooks, yuck.....
src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc
Outdated
Show resolved
Hide resolved
src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc
Outdated
Show resolved
Hide resolved
src/ray/core_worker/task_submission/tests/normal_task_submitter_test.cc
Outdated
Show resolved
Hide resolved
Signed-off-by: joshlee <joshlee@anyscale.com>
|
|
||
| // Keep retrying every 2 seconds until a task is officially | ||
| // finished. | ||
| if (!task_manager_.GetTaskSpec(task_id)) { |
There was a problem hiding this comment.
there's a different task manager api to check if a task is finished / failed
IsTaskPending() -> this is what the normal task submitter uses... this doesn't do that. Ideally the raylet should be able to tell you whether to retry or not and you shouldn't need this anyways?
Also what does the raylet respond with if the actor successully tried to cancel the task. It doesn't retry in that case right??
There was a problem hiding this comment.
talked offline, this check isn't necessary since when calling RetryCancelTask it'll check via IsTaskPending whether the task spec is still present or not.
| reply->set_attempt_succeeded(cancel_task_reply.attempt_succeeded()); | ||
| reply->set_requested_task_running(cancel_task_reply.requested_task_running()); | ||
| send_reply_callback(Status::OK(), nullptr, nullptr); | ||
| timer->cancel(); |
There was a problem hiding this comment.
I think there's some race here where the timer kicks off the callback before you cancel the timer but after you send the reply callback and you'll try to access reply after doing send_reply_callback and send the reply twice
There was a problem hiding this comment.
Ya good point, I think I guarded against the case where both callbacks are queued but only if the time exceeded callback is queued first. For the case where the rpc callback is queued first, I assumed the if (current_worker) check should guard against the time exceeded callback but now I don't think that's always true, the ipc worker death callback might not have been queued yet/later. Extended the replied flag to take into account whether the time exceeded callback or the main rpc callback is executed first.
Signed-off-by: joshlee <joshlee@anyscale.com>
|
Pending merge on #58947 then should rebase this PR and remove io context posts |
…58947) > Briefly describe what this PR accomplishes and why it's needed. This PR was motivated by #58018 where we call methods of the gcs node info accessor potentially from the user's python cancel thread, potentially causing thread safety issues. I did the trivial solution of adding a mutex onto node_cache_address_and_liveness_ cache. The one downside of this is instead of returning ptrs to the GcsNodeAddressAndLiveness objects in the cache, I return them by value instead. I didn't want to allow access to the mutex that guards the cache outside of the accessor since I think it's a bad precedent/will create a mess. --------- Signed-off-by: joshlee <joshlee@anyscale.com>
…el-task-fault-tolerant
…ay-project#58947) > Briefly describe what this PR accomplishes and why it's needed. This PR was motivated by ray-project#58018 where we call methods of the gcs node info accessor potentially from the user's python cancel thread, potentially causing thread safety issues. I did the trivial solution of adding a mutex onto node_cache_address_and_liveness_ cache. The one downside of this is instead of returning ptrs to the GcsNodeAddressAndLiveness objects in the cache, I return them by value instead. I didn't want to allow access to the mutex that guards the cache outside of the accessor since I think it's a bad precedent/will create a mess. --------- Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
edoakes
left a comment
There was a problem hiding this comment.
LGTM. Test failure but doesn't look related (in Serve)
…ay-project#58947) > Briefly describe what this PR accomplishes and why it's needed. This PR was motivated by ray-project#58018 where we call methods of the gcs node info accessor potentially from the user's python cancel thread, potentially causing thread safety issues. I did the trivial solution of adding a mutex onto node_cache_address_and_liveness_ cache. The one downside of this is instead of returning ptrs to the GcsNodeAddressAndLiveness objects in the cache, I return them by value instead. I didn't want to allow access to the mutex that guards the cache outside of the accessor since I think it's a bad precedent/will create a mess. --------- Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
| except asyncio.CancelledError: | ||
| print("Cancelled!") | ||
| signal_actor.send.remote() | ||
| yield "hi" |
There was a problem hiding this comment.
@edoakes It looks like this test got flakier from my changes.
What I observed before my changes was:
1.) Proxy Actor sends a CancelTask RPC to ServeReplica
2.) ServeReplica processes the CancelTask RPC
3.) SignalActor.send.remote() gets sent
4.) CancelChildren doesn't find any pending children tasks to cancel
With my changes 3/4 are flipped, and CancelChildren is cancelling the queued send.remote() task before it fires, so it's timing out. It looks like you ran into the same issue here: https://github.com/ray-project/ray/pull/43320/files#diff-463bbcf17174b07dd1780cae9d6b719b248a0245fa029f8d8f280bf092d4db45R336 and fixed it for the other serve cancellation tests, so I moved this one to also use send_signal_on_cancellation.
There was a problem hiding this comment.
Still trying to figure out why it got more flaky, reverted back to the last time this PR passed CI but it still is flaky locally then for me. I'd expect the timing to change a bit due to my cancellation path changes, but I would've thought it would've slowed the cancellation path due to the node status cache access in actor/normal task submitter so 3/4 should've been less flaky 🤔
There was a problem hiding this comment.
is it deflaked after using the context manager?
Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
abrarsheikh
left a comment
There was a problem hiding this comment.
looks good from serve
Signed-off-by: joshlee <joshlee@anyscale.com>
| inline void SendCancelLocalTask(std::shared_ptr<gcs::GcsClient> gcs_client, | ||
| const NodeID &node_id, | ||
| std::function<void(const rpc::Address &)> cancel_callback, | ||
| std::function<void()> failure_callback) { |
There was a problem hiding this comment.
how do you feel about cancel callback taking an address optional instead of having a separate failure callback here? Up to personal choice
There was a problem hiding this comment.
mmm I think I'll leave it as is, the place where the clean up for cancelled_tasks is happening in the cancel_callback is within the callback of CancelLocalTask, so I'd have to have two places now. Think it's more clear this way?
| << "with Worker ID: " << executor_worker_id; | ||
| if (timer) { | ||
| RAY_LOG(WARNING) << "Escalating graceful shutdown to SIGKILL instead."; | ||
| return; |
There was a problem hiding this comment.
I'm unsure if both of these should be warnings. For the force_kill case (if timer case), this is expected behavior so shouldn't be a warning imo.
There was a problem hiding this comment.
yea that's a good point, I'll have the timer log be an info log then and keep the one above be a warning.
Signed-off-by: joshlee <joshlee@anyscale.com>
| except asyncio.CancelledError: | ||
| print("Cancelled!") | ||
| signal_actor.send.remote() | ||
| yield "hi" |
There was a problem hiding this comment.
is it deflaked after using the context manager?
Description
Makes CancelTask RPC Fault Tolerant. Created an intermediary RPC similar to what was done in #57648 in that when the force_exit flag is enabled for cancel, the executor worker is shut down gracefully. However we have no way of determining whether the shutdown was successful on the owner core worker, hence we send the cancel request to the raylet via a new RPC CancelLocalTask that guarantees the worker is killed. Added a python test to verify retry behavior, leaving out the cpp test after talking to @dayshah due to being a bit complicated in that we need to take into account all orderings of the owner/executor states in the cancellation process.
In the current task submission path, we don't keep track of the raylet address of the worker when we receive the PushTask RPC. It's a bit complicated to do this since the GCS keeps track of actor lifecycles including requesting leases, hence instead of touching the hot path (task submission) we decided to just complicate the cancellation path. Upon receiving a CancelTask RPC, we will query the gcs node cache to get the node info. Only if its not in the cache do we then query the GCS. Unfortunately the gcs node cache is currently not thread safe and show only be accessed on the main io service hence we refactored Normal/ActorTaskSubmitter so that it posts the portion of the code that accesses the cache onto the main io service.
There was also a race condition in CancelLocalTask/KillLocalActor where send_reply_callback could be triggered twice if we receive the response from CancelTask/KillActor, but the worker is not evicted from the raylet worker pool immediately. Hence the callback in execute_after could trigger. So added a replied boolean flag to guard against this.
CancelTask when force_kill=true behavior has been modified to trigger a SIGKILL after a set amount of time if the graceful shutdown from the worker hangs.
Lastly, the actor task retry did not use the config used in the normal task retry timer. Updated this